In [1]:
import pandas as pd
import csv

from math import isnan
import numpy as np

import plotly
import seaborn as sns
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import plotly.express as px

import datetime
from datetime import datetime, date, timedelta
import time
import findspark

import pyspark
from pyspark.sql import SparkSession
import pyspark.pandas as ps
import pyspark.sql.functions as f
from pyspark.sql.types import StringType,BooleanType,DateType,IntegerType,FloatType

from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import year,month,dayofmonth
from pyspark.sql.functions import to_date
from pyspark.sql.functions import col,round
WARNING:root:'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.
In [2]:
spark=SparkSession.builder.getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/07 19:41:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/07 19:41:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
In [3]:
file_path = '/Users/kruthikk/Documents/CSP 554 Big Data/Final Project/CTA_-_Ridership_-_Daily_Boarding_Totals.csv'
df=spark.read.csv(file_path,inferSchema=True,header=True)
df.show()
print(df)
df=df.withColumn("service_date",df["service_date"].cast(DateType()))
+-------------------+--------+---------+--------------+-----------+
|       service_date|day_type|      bus|rail_boardings|total_rides|
+-------------------+--------+---------+--------------+-----------+
|2001-01-01 00:00:00|       U|  297,192|       126,455|    423,647|
|2001-01-02 00:00:00|       W|  780,827|       501,952|  1,282,779|
|2001-01-03 00:00:00|       W|  824,923|       536,432|  1,361,355|
|2001-01-04 00:00:00|       W|  870,021|       550,011|  1,420,032|
|2001-01-05 00:00:00|       W|  890,426|       557,917|  1,448,343|
|2001-01-06 00:00:00|       A|  577,401|       255,356|    832,757|
|2001-01-07 00:00:00|       U|  375,831|       169,825|    545,656|
|2001-01-08 00:00:00|       W|  985,221|       590,706|  1,575,927|
|2001-01-09 00:00:00|       W|  978,377|       599,905|  1,578,282|
|2001-01-10 00:00:00|       W|  984,884|       602,052|  1,586,936|
|2001-01-11 00:00:00|       W|  995,561|       607,503|  1,603,064|
|2001-01-12 00:00:00|       W|1,018,985|       605,252|  1,624,237|
|2001-01-13 00:00:00|       A|  591,791|       270,056|    861,847|
|2001-01-14 00:00:00|       U|  373,091|       174,842|    547,933|
|2001-01-15 00:00:00|       W|  675,845|       412,149|  1,087,994|
|2001-01-16 00:00:00|       W|1,024,367|       622,163|  1,646,530|
|2001-01-17 00:00:00|       W|1,018,690|       620,343|  1,639,033|
|2001-01-18 00:00:00|       W|1,006,996|       618,832|  1,625,828|
|2001-01-19 00:00:00|       W|  909,964|       583,851|  1,493,815|
|2001-01-20 00:00:00|       A|  582,348|       263,815|    846,163|
+-------------------+--------+---------+--------------+-----------+
only showing top 20 rows

DataFrame[service_date: timestamp, day_type: string, bus: string, rail_boardings: string, total_rides: string]
In [4]:
########## Type cast string column to int column in pyspark
df=df.withColumn('bus', regexp_replace('bus', ',', ''))
df=df.withColumn('rail_boardings', regexp_replace('rail_boardings', ',', ''))
df=df.withColumn('total_rides', regexp_replace('total_rides', ',', ''))
df=df.withColumn('day_type', regexp_replace('day_type','A', 'Sat'))
df=df.withColumn('day_type', regexp_replace('day_type','W', 'Weekday'))
df=df.withColumn('day_type', regexp_replace('day_type','U', 'Sun-Holiday'))
#df=df.withColumn('service_date',year(df.service_date))

df4 = df.select(
    df.service_date,
    df.day_type,
    df.bus.cast(IntegerType()).alias('bus'),
    df.rail_boardings.cast(IntegerType()).alias('rail_boardings'),
    df.total_rides.cast(IntegerType()).alias('total_rides'),
)

df4=df4.withColumn('service_date',year(df.service_date))

df4_sum_bus_rail=df4.groupBy("service_date").sum("bus","rail_boardings")

df4.printSchema()
df4.show()
df4_sum_bus_rail.show()
root
 |-- service_date: integer (nullable = true)
 |-- day_type: string (nullable = true)
 |-- bus: integer (nullable = true)
 |-- rail_boardings: integer (nullable = true)
 |-- total_rides: integer (nullable = true)

+------------+-----------+-------+--------------+-----------+
|service_date|   day_type|    bus|rail_boardings|total_rides|
+------------+-----------+-------+--------------+-----------+
|        2001|Sun-Holiday| 297192|        126455|     423647|
|        2001|    Weekday| 780827|        501952|    1282779|
|        2001|    Weekday| 824923|        536432|    1361355|
|        2001|    Weekday| 870021|        550011|    1420032|
|        2001|    Weekday| 890426|        557917|    1448343|
|        2001|        Sat| 577401|        255356|     832757|
|        2001|Sun-Holiday| 375831|        169825|     545656|
|        2001|    Weekday| 985221|        590706|    1575927|
|        2001|    Weekday| 978377|        599905|    1578282|
|        2001|    Weekday| 984884|        602052|    1586936|
|        2001|    Weekday| 995561|        607503|    1603064|
|        2001|    Weekday|1018985|        605252|    1624237|
|        2001|        Sat| 591791|        270056|     861847|
|        2001|Sun-Holiday| 373091|        174842|     547933|
|        2001|    Weekday| 675845|        412149|    1087994|
|        2001|    Weekday|1024367|        622163|    1646530|
|        2001|    Weekday|1018690|        620343|    1639033|
|        2001|    Weekday|1006996|        618832|    1625828|
|        2001|    Weekday| 909964|        583851|    1493815|
|        2001|        Sat| 582348|        263815|     846163|
+------------+-----------+-------+--------------+-----------+
only showing top 20 rows

[Stage 4:>                                                          (0 + 1) / 1]
+------------+---------+-------------------+
|service_date| sum(bus)|sum(rail_boardings)|
+------------+---------+-------------------+
|        2003|291805659|          181135096|
|        2007|309271308|          190273414|
|        2018|242173004|          225894953|
|        2015|274288770|          241676065|
|        2006|298433226|          195169313|
|        2022| 77657741|           56751335|
|        2013|300116373|          228684399|
|        2014|298731943|          258884389|
|        2019|237276399|          218467141|
|        2004|294030716|          178716468|
|        2020|121449920|           76049871|
|        2012|314423583|          231154339|
|        2009|318672787|          202569040|
|        2016|259058437|          238645812|
|        2001|301712869|          181692888|
|        2005|303244172|          186759525|
|        2010|306023984|          210849074|
|        2011|338054144|          241866109|
|        2008|328199217|          198137253|
|        2017|249231176|          230204047|
+------------+---------+-------------------+
only showing top 20 rows

                                                                                
In [5]:
df_bus_graph = ps.DataFrame(df4_sum_bus_rail)
df_bus_graph.plot.bar(x='service_date', y='sum(bus)',title='Sum Of Bus Rides Per Year',color='sum(bus)') 
In [6]:
df_rail_graph = ps.DataFrame(df4_sum_bus_rail)
df_rail_graph.plot.bar(x='service_date', y='sum(rail_boardings)',title='Sum Of Rail Rides Per Year',color='sum(rail_boardings)') 
In [7]:
df_rail_graph = ps.DataFrame(df4_sum_bus_rail)
df_rail_graph.plot.bar(x='service_date', y=['sum(bus)','sum(rail_boardings)'],barmode='group',title='Sum of Rides Yearly Bus vs Rail')
In [8]:
df_date_split= df.select(col("service_date"), 
     year(col("service_date")).alias("year"), 
     month(col("service_date")).alias("month"), 
     dayofmonth(col("service_date")).alias("day"),
     df.day_type,
     df.bus.cast(IntegerType()).alias('bus'),
     df.rail_boardings.cast(IntegerType()).alias('rail_boardings'),                    
     df.total_rides.cast(IntegerType()).alias('total_rides'),
  )
df_date_split.show()

print(df_date_split)
+------------+----+-----+---+-----------+-------+--------------+-----------+
|service_date|year|month|day|   day_type|    bus|rail_boardings|total_rides|
+------------+----+-----+---+-----------+-------+--------------+-----------+
|  2001-01-01|2001|    1|  1|Sun-Holiday| 297192|        126455|     423647|
|  2001-01-02|2001|    1|  2|    Weekday| 780827|        501952|    1282779|
|  2001-01-03|2001|    1|  3|    Weekday| 824923|        536432|    1361355|
|  2001-01-04|2001|    1|  4|    Weekday| 870021|        550011|    1420032|
|  2001-01-05|2001|    1|  5|    Weekday| 890426|        557917|    1448343|
|  2001-01-06|2001|    1|  6|        Sat| 577401|        255356|     832757|
|  2001-01-07|2001|    1|  7|Sun-Holiday| 375831|        169825|     545656|
|  2001-01-08|2001|    1|  8|    Weekday| 985221|        590706|    1575927|
|  2001-01-09|2001|    1|  9|    Weekday| 978377|        599905|    1578282|
|  2001-01-10|2001|    1| 10|    Weekday| 984884|        602052|    1586936|
|  2001-01-11|2001|    1| 11|    Weekday| 995561|        607503|    1603064|
|  2001-01-12|2001|    1| 12|    Weekday|1018985|        605252|    1624237|
|  2001-01-13|2001|    1| 13|        Sat| 591791|        270056|     861847|
|  2001-01-14|2001|    1| 14|Sun-Holiday| 373091|        174842|     547933|
|  2001-01-15|2001|    1| 15|    Weekday| 675845|        412149|    1087994|
|  2001-01-16|2001|    1| 16|    Weekday|1024367|        622163|    1646530|
|  2001-01-17|2001|    1| 17|    Weekday|1018690|        620343|    1639033|
|  2001-01-18|2001|    1| 18|    Weekday|1006996|        618832|    1625828|
|  2001-01-19|2001|    1| 19|    Weekday| 909964|        583851|    1493815|
|  2001-01-20|2001|    1| 20|        Sat| 582348|        263815|     846163|
+------------+----+-----+---+-----------+-------+--------------+-----------+
only showing top 20 rows

DataFrame[service_date: date, year: int, month: int, day: int, day_type: string, bus: int, rail_boardings: int, total_rides: int]
In [9]:
df_xmas=df_date_split.filter((df_date_split.month==12) & (df_date_split.day==25))
df_xmas.show()
+------------+----+-----+---+-----------+------+--------------+-----------+
|service_date|year|month|day|   day_type|   bus|rail_boardings|total_rides|
+------------+----+-----+---+-----------+------+--------------+-----------+
|  2001-12-25|2001|   12| 25|Sun-Holiday|218872|         93525|     312397|
|  2002-12-25|2002|   12| 25|Sun-Holiday|237547|         96846|     334393|
|  2003-12-25|2003|   12| 25|Sun-Holiday|247349|        101016|     348365|
|  2004-12-25|2004|   12| 25|Sun-Holiday|213912|         87992|     301904|
|  2005-12-25|2005|   12| 25|Sun-Holiday|263158|        105718|     368876|
|  2006-12-25|2006|   12| 25|Sun-Holiday|242369|        112855|     355224|
|  2007-12-25|2007|   12| 25|Sun-Holiday|249699|        111582|     361281|
|  2008-12-25|2008|   12| 25|Sun-Holiday|238727|        113322|     352049|
|  2009-12-25|2009|   12| 25|Sun-Holiday|249347|        121545|     370892|
|  2010-12-25|2010|   12| 25|Sun-Holiday|233266|        128261|     361527|
|  2011-12-25|2011|   12| 25|Sun-Holiday|273257|        144879|     418136|
|  2012-12-25|2012|   12| 25|Sun-Holiday|242609|        143166|     385775|
|  2013-12-25|2013|   12| 25|Sun-Holiday|218166|        144912|     363078|
|  2014-12-25|2014|   12| 25|Sun-Holiday|238015|        159831|     397846|
|  2015-12-25|2015|   12| 25|Sun-Holiday|231718|        162100|     393818|
|  2016-12-25|2016|   12| 25|Sun-Holiday|199220|        131542|     330762|
|  2017-12-25|2017|   12| 25|Sun-Holiday|164256|        118654|     282910|
|  2018-12-25|2018|   12| 25|Sun-Holiday|188696|        127994|     316690|
|  2019-12-25|2019|   12| 25|Sun-Holiday|200121|        133009|     333130|
|  2020-12-25|2020|   12| 25|Sun-Holiday| 80783|         40861|     121644|
+------------+----+-----+---+-----------+------+--------------+-----------+
only showing top 20 rows

In [10]:
df_xmas_graph = ps.DataFrame(df_xmas)
#df_xmas_graph.plot.line() 
df_xmas_graph.plot.line(x="year", y='total_rides',title='Sum Of Rides During Xmas') 
In [11]:
df_avg_bus_rail=df_date_split.groupBy("year","day_type").avg("total_rides").alias("avg_total_rides")
##df.groupBy("device").agg(round(mean("percent"),2).alias("y")).show()
##round(mean("percent"),2).alias("y")
df_avg_bus_rail.show()
+----+-----------+------------------+
|year|   day_type|  avg(total_rides)|
+----+-----------+------------------+
|2013|        Sat|1068307.6153846155|
|2017|    Weekday| 1540769.161417323|
|2005|Sun-Holiday| 658513.9310344828|
|2020|Sun-Holiday| 297730.3793103448|
|2015|Sun-Holiday|  760211.275862069|
|2018|    Weekday|1507281.5215686276|
|2020|        Sat| 397918.3846153846|
|2015|        Sat|1027862.9038461539|
|2009|        Sat|1058356.4038461538|
|2021|        Sat| 447709.4423076923|
|2014|Sun-Holiday| 768201.1746031746|
|2019|        Sat| 855269.6153846154|
|2002|    Weekday|1569327.7529411765|
|2009|Sun-Holiday|  748777.551724138|
|2007|Sun-Holiday| 674850.1724137932|
|2007|    Weekday|1606361.0862745098|
|2016|        Sat| 972639.4905660377|
|2008|    Weekday|    1679117.390625|
|2012|Sun-Holiday| 801840.2033898305|
|2018|        Sat| 884775.0192307692|
+----+-----------+------------------+
only showing top 20 rows

In [12]:
df_avg_bus_rail_graph = ps.DataFrame(df_avg_bus_rail)
df_avg_bus_rail_pivot = df_avg_bus_rail_graph.pivot(index='year', columns='day_type', values='avg(total_rides)')
print(df_avg_bus_rail_pivot.head(10))
[Stage 35:>                                                         (0 + 1) / 1]
day_type           Sat    Sun-Holiday       Weekday
year                                               
2003      9.060174e+05  603536.465517  1.532638e+06
2007      9.765641e+05  674850.172414  1.606361e+06
2018      8.847750e+05  650049.448276  1.507282e+06
2015      1.027863e+06  760211.275862  1.640877e+06
2006      9.645448e+05  660497.525424  1.592429e+06
2022      4.911355e+05  375926.272727  7.214750e+05
2013      1.068308e+06  763523.879310  1.682213e+06
2014      1.044622e+06  768201.174603  1.627151e+06
2019      8.552696e+05  633502.500000  1.468731e+06
2004      9.204811e+05  615553.948276  1.517901e+06
                                                                                
In [13]:
df_avg_bus_rail_pivot.plot.bar(y=['Sat','Sun-Holiday','Weekday'],title='Avg Of Rides by Day Type',barmode='group') 
In [ ]: